package org.databandtech.flinkstreaming;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.databandtech.flinkstreaming.Entity.EpgVod;
import org.databandtech.flinkstreaming.Entity.ItemCount;

/**
 * cd /usr/app/flink-1.13.2 启动 ./bin/start-cluster.sh bin/flink run --class
 * org.databandtech.flinkstreaming.App /usr/app/flink-1.13.2/examples/rt.jar
 */
public class KafkaConsumerApp {

	static String OUTPUTPATH = "D:\\sink";

	public static void main(String[] args) {

		// 创建Flink执行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(3);

		env.setStateBackend(new HashMapStateBackend()); // 使用堆内存的StateBackend

		// Kafka参数
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "hadoop001:9092");
		properties.setProperty("group.id", "flink-group");
		String inputTopic = "Hello-Kafka";
		String outputTopicSink = "Hello-Kafka-out";
		String outputLateTopicSink = "Late-Topic";

		// Source
		FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),
				properties);

		// 设置检查点
		env.enableCheckpointing(10000);
		// 检查点高级选项
		// 设置模式为精确一次 (这是默认值)
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		// 确认 checkpoints 之间的时间会进行 500 ms
		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
		// Checkpoint 必须在一分钟内完成，否则就会被抛弃
		env.getCheckpointConfig().setCheckpointTimeout(60000);
		// 同一时间只允许一个 checkpoint 进行
		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
		// 开启在 job 中止后仍然保留的 externalized checkpoints，需要手工删除
		env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		// 删除策略，这种设置有丢失checkpoint的危险
		// env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
		// 开启实验性的 unaligned checkpoints
		env.getCheckpointConfig().enableUnalignedCheckpoints();
		// 存储
		// env.getCheckpointConfig().setCheckpointStorage(new
		// FileSystemCheckpointStorage("hdfs://hadoop001:40010/flink/checkpoints"));
		env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///flink/checkpoints"));

		// 设置活动时间
		// In Flink 1.12 the default stream time characteristic has been changed to
		// TimeCharacteristic.EventTime, thus you don't need to call this method for
		// enabling event-time support anymore.
		// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);//处理时间：处理时间是指执行相应
		// 算子操作的机器的系统时间
		// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄取时间是事件进入Flink的时间。
		// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//事件时间是每个事件在其生产设备上发生的时间。

		// 用于控制消费offset
		consumer.setStartFromEarliest();
		// consumer.setStartFromEarliest(); // start from the earliest record possible
		// consumer.setStartFromLatest(); // start from the latest record
		// consumer.setStartFromTimestamp(...); // start from specified epoch timestamp
		// (milliseconds)
		// consumer.setStartFromGroupOffsets(); // the default behaviour
		// 用于控制特殊消费Offset,Partition
		// Map<KafkaTopicPartition, LonPartitiong> specificStartOffsets = new
		// HashMap<>();
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
		// specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
		// myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

		// sink 1
		final StreamingFileSink<Tuple3<String, String, Integer>> sink = StreamingFileSink
				.forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<Tuple3<String, String, Integer>>("UTF-8"))
				.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
						.withInactivityInterval(TimeUnit.MINUTES.toMillis(5000)).withMaxPartSize(1024 * 1024 * 1024)
						.build())
				.build();

		// sink 2
		final StreamingFileSink<ItemCount> sink2 = StreamingFileSink
				.forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<ItemCount>("UTF-8"))
				.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
						.withInactivityInterval(TimeUnit.MINUTES.toMillis(5000)).withMaxPartSize(1024 * 1024 * 1024)
						.build())
				.build();

		// sink 3
		final StreamingFileSink<List<ItemCount>> sink3 = StreamingFileSink
				.forRowFormat(new Path(OUTPUTPATH), new SimpleStringEncoder<List<ItemCount>>("UTF-8"))
				.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(60000))
						.withInactivityInterval(TimeUnit.MINUTES.toMillis(5000)).withMaxPartSize(1024 * 1024 * 1024)
						.build())
				.build();

		// redis sink
		// url:
		// https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

		// kafka sink
		// url:
		// https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/kafka/

		//原始kafka-json输入
		DataStream<String> stream = env.addSource(consumer);
		// stream.print(); //仅打印输入

		// 类型转换后的dataframe，作为可重用的数据源
		DataStream<EpgVod> objStream = stream.map(new KafkaConsumerMapper());
		// objStream.print(); //仅打印输入

		// 在线人数统计1，翻滚窗口，直接执行ProcessWindowFunction
//        DataStream<Tuple3<String,String,Integer>> onlineOut = 
//        		objStream
//        		.filter( item -> item.getSys_id().equals("m") || 
//        				item.getSys_id().equals("u") || item.getSys_id().equals("t")) //执行过滤
//        		.keyBy(obj -> obj.getSys_id()+"-"+obj.getAction_type()) //分区
//        		.window(TumblingProcessingTimeWindows.of(Time.minutes(1L))) //翻滚窗口
//        		.process(new KafkaOnlineCountFuntion());  //直接执行process

//        onlineOut.print(); 

		// 在线人数统计2，滑动或滚动窗口，预聚合后执行窗口函数

		DataStream<ItemCount> onlineOut2 = objStream
				.filter(item -> item.getSys_id().equals("m") || item.getSys_id().equals("u")
						|| item.getSys_id().equals("t")) // 执行过滤
				 .assignTimestampsAndWatermarks(WatermarkStrategy.<EpgVod>forBoundedOutOfOrderness(Duration.ofSeconds(5))
						 .withTimestampAssigner(new KafkaTimestampAssigner())) // 定义水位和时间戳提取类实例
				// .assignTimestampsAndWatermarks(WatermarkStrategy.<EpgVod>noWatermarks()
				// 		 .withTimestampAssigner(new KafkaTimestampAssigner()))//用事件时间，但不用watermark
				.keyBy(obj -> obj.getSys_id() + "-" + obj.getAction_type()) // key分区
				.window(TumblingProcessingTimeWindows.of(Time.seconds(1L))) // 翻滚窗口
				// .window(SlidingEventTimeWindows.of(Time.minutes(2L), Time.minutes(1L)))
				// //滑动窗口
				.allowedLateness(Time.minutes(1))// 设置一个允许超出水位后再次延迟的事件时间长度，在窗口关闭后的这个时间长度内，对于属于该窗口的迟到数据来一次，计算一次
				// .sideOutputLateData(new KafkaLateTag(outputLateTopicSink))
				// //侧输出流，用于收集超出水位后再次延迟的事件
				.aggregate(new KafkaAggFunction(), new KafkaWindowFunction("OnlineCount"));

		onlineOut2.print("onlineOut");
		// onlineOut2.addSink(sink2);
		DataStream<List<ItemCount>> onlineOutTopN = onlineOut2.keyBy(obj -> obj.getWindowEnd())
				.process(new KafkaTopNKeyedFunction(3)); // TopN排序,有状态处理
		onlineOutTopN.print("TopN");
		onlineOutTopN.addSink(sink3);

		// 区域人数topN统计
		DataStream<ItemCount> areaOut = objStream.keyBy(obj -> obj.getSys_id() + "-" + obj.getArea_code())
				.window(TumblingProcessingTimeWindows.of(Time.seconds(1L)))
				// .process(new KafkaAreaFuntion());
				.aggregate(new KafkaAggFunction(), new KafkaWindowFunction("AreaCount"));// 预聚合 + 窗口行数

		areaOut.print("areaOut");
		areaOut.addSink(sink2);

		try {
			env.execute("kafkaTest");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
